Skip to content

Conversation

@zzstoatzz
Copy link

@zzstoatzz zzstoatzz commented May 22, 2025

i haven't ruled out me misinterpreting something, but here's what it seemed like to me:

the current implementation appears to write cursor=0 to the database when starting fresh, even though the client fetches from the latest firehose data, causing a full history replay on restart

this PR removes this seemingly early write and upserts to save the actual received cursor

see my chronicled misfortunes

import os

import peewee

DATABASE_FILE = "mre_concise_cursor_bug.sqlite"
SERVICE_NAME = "example_feed_service"

db = peewee.SqliteDatabase(DATABASE_FILE)


class BaseModel(peewee.Model):
    class Meta:
        database = db


class SubscriptionState(BaseModel):
    """Mirrors the SubscriptionState model from the application."""

    service = peewee.CharField(unique=True)
    cursor = peewee.BigIntegerField()

    def __str__(self):
        return f"<SubscriptionState service='{self.service}', cursor={self.cursor}>"


# --- Firehose Client Simulation ---
class SimulatedFirehoseClient:
    def __init__(self, cursor_param=None):
        self.effective_start_point = (
            "LATEST (HEAD)" if cursor_param is None else cursor_param
        )
        print(f"  FirehoseClient: Will request from {self.effective_start_point}")


# --- Application Logic Simulation ---
def run_firehose_initialization_simulation(step_name):
    print(f"\\n--- {step_name} ---")

    initial_db_state = SubscriptionState.get_or_none(
        SubscriptionState.service == SERVICE_NAME
    )
    print(f"  App: DB state before client init: {initial_db_state}")

    client_init_cursor = initial_db_state.cursor if initial_db_state else None
    client = SimulatedFirehoseClient(cursor_param=client_init_cursor)

    # HERE IS THE (POTENTIAL) PROBLEM
    if initial_db_state is None:
        print(
            f"  App: No DB state, creating SubscriptionState(service='{SERVICE_NAME}', cursor=0)"
        )
        SubscriptionState.create(service=SERVICE_NAME, cursor=0)

    final_db_state = SubscriptionState.get_or_none(
        SubscriptionState.service == SERVICE_NAME
    )
    print(f"  App: DB state after client init & potential create: {final_db_state}")
    print(
        f"  Outcome: Client targets '{client.effective_start_point}', DB stores cursor '{final_db_state.cursor if final_db_state else None}'"
    )
    return (
        client.effective_start_point,
        final_db_state.cursor if final_db_state else None,
    )


def demonstrate_bug():
    print("MRE: Concise Firehose Cursor Bug Demonstration")
    print(
        "Illustrates premature `cursor=0` write vs. client's actual firehose start point.\\n"
    )

    if os.path.exists(DATABASE_FILE):
        os.remove(DATABASE_FILE)
    db.connect()
    db.create_tables([SubscriptionState])

    try:
        s1_client_target, s1_db_cursor = run_firehose_initialization_simulation(
            "Step 1: First App Run (Empty DB)"
        )
        s2_client_target, s2_db_cursor = run_firehose_initialization_simulation(
            "Step 2: App Restart (Reads DB from Step 1)"
        )

        print("\\n--- Bug Analysis ---")
        bug_is_present = (
            s1_client_target == "LATEST (HEAD)"
            and s1_db_cursor == 0
            and s2_client_target == 0
            and s2_db_cursor == 0
        )
        if bug_is_present:
            print("  Result: Bug Confirmed.")
            print(
                f"    - First run: Client aims for '{s1_client_target}', but DB is set to '{s1_db_cursor}'."
            )
            print(
                f"    - Restart: Client incorrectly aims for '{s2_client_target}' due to DB state."
            )
            print(
                "    - Implication: App would try to process all history from cursor 0."
            )
        else:
            print("  Result: Bug pattern not matched. Please review output.")

    finally:
        if not db.is_closed():
            db.close()
        if os.path.exists(DATABASE_FILE):
            os.remove(DATABASE_FILE)
            print(f"\\n(Cleaned up {DATABASE_FILE})")


if __name__ == "__main__":
    demonstrate_bug()

omit unrelates lints
@MarshalX
Copy link
Owner

What a research! Awesome job!

I was a little bit confused by this part:

However, this 9447863000 might correspond to a point in time after the app had already processed some initial messages during its first run (when it was listening from head but before this first save). This could result in a small gap of unprocessed posts. This also explained why "old posts from the time of deployment" would appear if that's when such a cursor got written.

I understand why it could be slightly behind. Which will lead to the gap of already processed posts. But not unprocessed as written above

What is complexity of this on conflict check? Does this ORM lib uses proper sql without overheads?

Would it be enough to check that we are processing the first message from firehose and must update cursor even if it is not % 1000?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants